package com.ken.debugonline.sender;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Slf4j
public class MsgSender {
    private BlockingQueue<Integer> needSend = new LinkedBlockingQueue<>(5);
    Object connectionMonitor = new Object();

    /**
     * 将消息放入队列中,使用线程异步发送,若队列已满则同步发送
     * @param msg
     */
    public void sendMsg(Integer msg){
        boolean offer = needSend.offer(msg);
        // 若队列已满则同步发送
        if (!offer){
            this.sender(msg);
//            log.warn("send queue is full.msg:{}",msg);
        }
    }

    private void sender(Integer msg){
        // 该锁模拟rabbitmq获取connection是的逻辑 CachingConnectionFactory.java-> createConnection
        synchronized (connectionMonitor){
            try {
                // 模拟mq建立连接时的网络阻塞
                Thread.sleep(20000);
                log.info("send msg:{} to mq",msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 消费队列中需要发送的msg
     */
    public void init(){
        new Thread(()->{
            while (true){
                try{
                    Integer msg = this.needSend.poll();
                    this.sender(msg);
                }catch (Exception e){
                    log.info("send msg error.",e);
                }
            }
        }).start();
    }

}
